{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Parallel and Distributed Machine Learning\n",
    "\n",
    "[Dask-ML](https://dask-ml.readthedocs.io) has resources for parallel and distributed machine learning."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Types of Scaling\n",
    "\n",
    "There are a couple of distinct scaling problems you might face.\n",
    "The scaling strategy depends on which problem you're facing.\n",
    "\n",
    "1. Large Models: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.\n",
    "2. Large Datasets: Data is larger than RAM, and sampling isn't an option.\n",
    "\n",
    "![](static/ml-dimensions-color.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "* For in-memory problems, just use scikit-learn (or your favorite ML library).\n",
    "* For large models, use `dask_ml.joblib` and your favorite scikit-learn estimator\n",
    "* For large datasets, use `dask_ml` estimators"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Scikit-Learn in 5 Minutes\n",
    "\n",
    "Scikit-Learn has a nice, consistent API.\n",
    "\n",
    "1. You instantiate an `Estimator` (e.g. `LinearRegression`, `RandomForestClassifier`, etc.). All of the models *hyperparameters* (user-specified parameters, not the ones learned by the estimator) are passed to the estimator when it's created.\n",
    "2. You call `estimator.fit(X, y)` to train the estimator.\n",
    "3. Use `estimator` to inspect attributes, make predictions, etc. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's generate some random data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.datasets import make_classification\n",
    "\n",
    "X, y = make_classification(n_samples=10000, n_features=4, random_state=0)\n",
    "X[:8]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "y[:8]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We'll fit a LogisitcRegression."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.svm import SVC"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create the estimator and fit it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator = SVC(random_state=0)\n",
    "estimator.fit(X, y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Inspect the learned attributes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator.support_vectors_[:4]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Check the accuracy."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator.score(X, y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Hyperparameters"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Most models have *hyperparameters*. They affect the fit, but are specified up front instead of learned during training."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator = SVC(C=0.00001, shrinking=False, random_state=0)\n",
    "estimator.fit(X, y)\n",
    "estimator.support_vectors_[:4]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator.score(X, y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Hyperparameter Optimization\n",
    "\n",
    "There are a few ways to learn the best *hyper*parameters while training. One is `GridSearchCV`.\n",
    "As the name implies, this does a brute-force search over a grid of hyperparameter combinations."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.model_selection import GridSearchCV"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "estimator = SVC(gamma='auto', random_state=0, probability=True)\n",
    "param_grid = {\n",
    "    'C': [0.001, 10.0],\n",
    "    'kernel': ['rbf', 'poly'],\n",
    "}\n",
    "\n",
    "grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)\n",
    "grid_search.fit(X, y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Single-machine parallelism with scikit-learn\n",
    "\n",
    "![](static/sklearn-parallel.png)\n",
    "\n",
    "Scikit-Learn has nice *single-machine* parallelism, via Joblib.\n",
    "Any scikit-learn estimator that can operate in parallel exposes an `n_jobs` keyword.\n",
    "This controls the number of CPU cores that will be used."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)\n",
    "grid_search.fit(X, y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Multi-machine parallelism with Dask\n",
    "\n",
    "![](static/sklearn-parallel-dask.png)\n",
    "\n",
    "Dask can talk to scikit-learn (via joblib) so that your *cluster* is used to train a model. \n",
    "\n",
    "If you run this on a laptop, it will take quite some time, but the CPU usage will be satisfyingly near 100% for the duration. To run faster, you would need a disrtibuted cluster. That would mean putting something in the call to `Client` something like\n",
    "\n",
    "```\n",
    "c = Client('tcp://my.scheduler.address:8786')\n",
    "```\n",
    "\n",
    "Details on the many ways to create a cluster can be found [here](http://distributed.readthedocs.io/en/latest/setup.html)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's try it on a larger problem (more hyperparameters)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# follow insluctions to know what to pass to Client!\n",
    "import dask.distributed\n",
    "c = dask.distributed.Client()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "param_grid = {\n",
    "    'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0],\n",
    "    'kernel': ['rbf', 'poly', 'linear'],\n",
    "    'shrinking': [True, False],\n",
    "}\n",
    "\n",
    "grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=5, n_jobs=-1)\n",
    "\n",
    "with joblib.parallel_backend(\"dask\", scatter=[X, y]):\n",
    "    grid_search.fit(X, y)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "grid_search.best_params_, grid_search.best_score_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Training on Large Datasets\n",
    "\n",
    "Sometimes you'll want to train on a larger than memory dataset. `dask-ml` has implemented estimators that work well on dask arrays and dataframes that may be larger than your machine's RAM."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask.array as da\n",
    "import dask.delayed\n",
    "from sklearn.datasets import make_blobs\n",
    "import numpy as np"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We'll make a small (random) dataset locally using scikit-learn."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "n_centers = 12\n",
    "n_features = 20\n",
    "\n",
    "X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)\n",
    "\n",
    "centers = np.zeros((n_centers, n_features))\n",
    "\n",
    "for i in range(n_centers):\n",
    "    centers[i] = X_small[y_small == i].mean(0)\n",
    "    \n",
    "centers[:4]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The small dataset will be the template for our large random dataset.\n",
    "We'll use `dask.delayed` to adapt `sklearn.datasets.make_blobs`, so that the actual dataset is being generated on our workers. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "n_samples_per_block = 200000\n",
    "n_blocks = 500\n",
    "\n",
    "delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,\n",
    "                                     centers=centers,\n",
    "                                     n_features=n_features,\n",
    "                                     random_state=i)[0]\n",
    "            for i in range(n_blocks)]\n",
    "arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X.dtype)\n",
    "          for obj in delayeds]\n",
    "X = da.concatenate(arrays)\n",
    "X"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X.nbytes / 1e9"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X = X.persist()  # Only run this on the cluster."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The algorithms implemented in Dask-ML are scalable. They handle larger-than-memory datasets just fine.\n",
    "\n",
    "They follow the scikit-learn API, so if you're familiar with scikit-learn, you'll feel at home with Dask-ML."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask_ml.cluster import KMeans"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "clf = KMeans(init_max_iter=3, oversampling_factor=10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%time clf.fit(X)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "clf.labels_"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "clf.labels_[:10].compute()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
